/**
 *  Copyright (c) 1997-2013, www.tinygroup.org (luo_guo@icloud.com).
 *
 *  Licensed under the GPL, Version 3.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *       http://www.gnu.org/licenses/gpl.html
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.tinygroup.dbrouterjdbc3.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.tinygroup.cache.Cache;
import org.tinygroup.commons.tools.Assert;
import org.tinygroup.commons.tools.CollectionUtil;
import org.tinygroup.dbrouter.RouterKeyGenerator;
import org.tinygroup.dbrouter.RouterManager;
import org.tinygroup.dbrouter.StatementProcessor;
import org.tinygroup.dbrouter.cache.CacheKey;
import org.tinygroup.dbrouter.config.Partition;
import org.tinygroup.dbrouter.config.Router;
import org.tinygroup.dbrouter.config.Shard;
import org.tinygroup.dbrouter.context.RealStatementExecutor;
import org.tinygroup.dbrouter.context.ResultSetExecutor;
import org.tinygroup.dbrouter.context.StatementExecuteContext;
import org.tinygroup.dbrouter.exception.DbrouterRuntimeException;
import org.tinygroup.dbrouter.factory.RouterManagerBeanFactory;
import org.tinygroup.dbrouter.impl.InsertSqlTransform;
import org.tinygroup.dbrouter.impl.InsertSqlTransform.ColumnInfo;
import org.tinygroup.dbrouter.util.DbRouterUtil;
import org.tinygroup.dbrouter.util.ParamObjectBuilder;
import org.tinygroup.dbrouterjdbc3.thread.ExecuteQueryCallBack;
import org.tinygroup.dbrouterjdbc3.thread.ExecuteResultCallable;
import org.tinygroup.dbrouterjdbc3.thread.ExecuteUpdateCallBack;
import org.tinygroup.jsqlparser.expression.Expression;
import org.tinygroup.jsqlparser.expression.LongValue;
import org.tinygroup.jsqlparser.expression.StringValue;
import org.tinygroup.jsqlparser.expression.operators.relational.ExpressionList;
import org.tinygroup.jsqlparser.expression.operators.relational.ItemsList;
import org.tinygroup.jsqlparser.schema.Column;
import org.tinygroup.jsqlparser.statement.insert.Insert;
import org.tinygroup.jsqlparser.statement.select.Select;
import org.tinygroup.logger.LogLevel;
import org.tinygroup.logger.Logger;
import org.tinygroup.logger.LoggerFactory;

/**
 * 功能说明:
 * <p/>
 * <p/>
 * 开发人员: renhui <br>
 * 开发时间: 2013-12-24 <br>
 * <br>
 */
public class TinyStatement implements Statement {
	protected ThreadLocal<Map<Shard, Statement>> statementMap = new ThreadLocal<Map<Shard, Statement>>();
	protected final TinyConnection tinyConnection;
	protected RouterManager routerManager = RouterManagerBeanFactory
			.getManager();
	protected final Router router;
	protected boolean isClosed;
	protected int maxRows;
	protected boolean escapeProcessing = true;
	protected int queryTimeout = 5;
	protected ResultSet resultSet;
	protected int updateCount;
	protected final boolean closedByResultSet;
	protected final int resultSetType;
	protected final int resultSetConcurrency;
	protected boolean cancelled;
	protected int fetchSize = 100;
	protected boolean autoCommit = true;
	protected ParamObjectBuilder builder;

	protected ThreadPoolExecutor pool;
	protected static Logger logger = LoggerFactory
			.getLogger(TinyStatement.class);

	public TinyStatement(Router router, TinyConnection tinyConnection,
			int resultSetType, int resultSetConcurrency,
			boolean closedByResultSet, boolean autoCommit) {
		Assert.assertNotNull(tinyConnection, "tinyConnection must not null");
		this.router = router;
		this.tinyConnection = tinyConnection;
		this.closedByResultSet = closedByResultSet;
		this.resultSetType = resultSetType;
		this.resultSetConcurrency = resultSetConcurrency;
		this.autoCommit = autoCommit;
		int threadSize = router.getThreadSize();
		if (threadSize > 0) {
			this.pool = new ThreadPoolExecutor(threadSize, threadSize, 0L,
					TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
		}
		statementMap.set(new HashMap<Shard, Statement>());
	}

	public ResultSet executeQuery(String sql) throws SQLException {
		checkClosed();
		closeOldResultSet();
		StatementExecuteContext context = initStatementContext(sql);
		resultSet = createResultByContext(sql, context);
		return resultSet;
	}

	private ResultSet createResultByContext(String sql,
			StatementExecuteContext context) throws SQLException {
		List<RealStatementExecutor> statements = getStatementsByContext(context);
		context.setStatements(statements);
		int statementSize = statements.size();
		if (statementSize == 1) {
			return createSingleResultSet(sql, statements);
		} else if (statementSize > 1) {
			return createMultiResultSet(sql, context);
		}
		return null;
	}

	private StatementExecuteContext initStatementContext(String sql)
			throws SQLException {
		StatementExecuteContext context = new StatementExecuteContext();
		context.setOrignalSql(sql);
		context.setTinyConnection(tinyConnection);
		context.setTinyStatement(this);
		context.setRouter(router);
		context.setBuilder(builder);
		Partition partition = routerManager.getPartition(router, sql);
		context.setPartition(partition);
		return context;
	}

	private ResultSet createSingleResultSet(String sql,
			List<RealStatementExecutor> statements) throws SQLException {
		ResultSet resultSet = statements.get(0).executeQuery();
		return new TinyResultSetWrapper(sql, resultSet);
	}

	/**
	 * 存在多个分片，创建合并多个分片的结果集
	 * 
	 * @param sql
	 * @param statements
	 * @param statementSize
	 * @return
	 * @throws SQLException
	 */
	private ResultSet createMultiResultSet(String sql,
			StatementExecuteContext context) throws SQLException {
		List<RealStatementExecutor> statements = context.getRealStatements();
		int statementSize = statements.size();
		boolean existIdleThread = existIdleThread(statementSize);
		List<ResultSetExecutor> resultSetExecutors = new ArrayList<ResultSetExecutor>();
		if (existIdleThread) {
			resultSetExecutors = createResultsInMultiThread(sql, statements,
					statementSize);
		} else {
			resultSetExecutors = createResultsWithStatements(statements);
		}
		context.setResultSetExecutors(resultSetExecutors);
		StatementProcessor statementProcessor = context.getStatementProcessor();
		if (statementProcessor != null) {
			return statementProcessor.combineResult(statements.get(0)
					.getExecuteSql(), context);
		} else {
			return new TinyResultSetMultiple(context);
		}
	}

	private List<ResultSetExecutor> createResultsWithStatements(
			List<RealStatementExecutor> statements) throws SQLException {
		List<ResultSetExecutor> resultSetExecutors = new ArrayList<ResultSetExecutor>();
		for (RealStatementExecutor statement : statements) {
			ResultSet realResultSet = statement.executeQuery();
			resultSetExecutors.add(new ResultSetExecutor(realResultSet,
					statement.getExecuteSql(), statement.getOriginalSql(),
					statement.getShard(), statement.getPartition(), statement
							.getRouter()));
		}
		return resultSetExecutors;
	}

	private List<ResultSetExecutor> createResultsInMultiThread(String sql,
			List<RealStatementExecutor> statements, int statementSize) {
		List<ResultSetExecutor> resultSetExecutors = new ArrayList<ResultSetExecutor>();
		long startTime = System.currentTimeMillis();
		List<Future<ResultSetExecutor>> futures = new ArrayList<Future<ResultSetExecutor>>();
		for (int j = 0; j < statementSize; j++) {
			RealStatementExecutor realStatementExecutor = statements.get(j);
			ExecuteResultCallable<ResultSetExecutor> callable = new ExecuteResultCallable<ResultSetExecutor>(
					realStatementExecutor);
			callable.setCallBack(new ExecuteQueryCallBack());
			Future<ResultSetExecutor> future = pool.submit(callable);
			futures.add(future);
		}
		for (Future<ResultSetExecutor> future : futures) {
			try {
				ResultSetExecutor resultSetExecutor = future.get();
				resultSetExecutors.add(resultSetExecutor);
			} catch (Exception e) {
				logger.errorMessage("查询sql:<{0}>出错", e, sql);
			}
		}
		long endTime = System.currentTimeMillis();
		logger.logMessage(LogLevel.DEBUG, "本次查询总执行时间：{}", endTime - startTime);
		return resultSetExecutors;
	}

	private boolean existIdleThread(int statementSize) {
		if (pool == null) {
			return false;
		}
		int threadSize = router.getThreadSize();
		if (pool.getActiveCount() + statementSize < threadSize) {
			return true;
		}
		return false;
	}

	public int executeUpdate(String sql) throws SQLException {
		checkClosed();
		closeOldResultSet();
		StatementExecuteContext context = initStatementContext(sql);
		updateCount = executeByContext(context);
		return updateCount;
	}

	private int executeByContext(StatementExecuteContext context)
			throws SQLException {
		List<RealStatementExecutor> statements = getStatementsByContext(context);
		int statementSize = statements.size();
		if (statementSize == 1) {
			return statements.get(0).executeUpdate();
		} else {
			checkExecuteStatment(context);
			boolean existIdleThread = existIdleThread(statementSize);
			if (existIdleThread) {
				return executeInMultiThread(statements, context.getOrignalSql());
			}
			return executeByShards(statements);
		}

	}

	private int executeByShards(List<RealStatementExecutor> statements)
			throws SQLException {
		int updateCount = 0;
		for (RealStatementExecutor statement : statements) {
			updateCount += statement.executeUpdate();
		}
		return updateCount;
	}

	private int executeInMultiThread(List<RealStatementExecutor> statements,
			String sql) {
		int statementSize = statements.size();
		int updateCount = 0;
		long startTime = System.currentTimeMillis();
		List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
		for (int j = 0; j < statementSize; j++) {
			RealStatementExecutor realStatementExecutor = statements.get(j);
			ExecuteResultCallable<Integer> callable = new ExecuteResultCallable<Integer>(
					realStatementExecutor);
			callable.setCallBack(new ExecuteUpdateCallBack());
			Future<Integer> future = pool.submit(callable);
			futures.add(future);
		}
		for (Future<Integer> future : futures) {
			try {
				updateCount += future.get();
			} catch (Exception e) {
				logger.errorMessage("执行sql:<{0}>出错", e, sql);
			}
		}
		long endTime = System.currentTimeMillis();
		logger.logMessage(LogLevel.DEBUG, "本次总执行时间：{}", endTime - startTime);
		return updateCount;
	}

	private void checkExecuteStatment(StatementExecuteContext context)
			throws SQLException {
		if (context.getPartition().getMode() == Partition.MODE_PRIMARY_SLAVE
				&& tinyConnection.getAutoCommit()) {
			throw new RuntimeException(
					"primary slave mode exist one more write database,the connection autocommit must set false");
		}
	}

	protected Statement getStatement(Shard shard, String executeSql)
			throws SQLException {
		Statement statement = statementMap.get().get(shard);
		if (tinyConnection.getAutoCommit() != autoCommit) {// 有调用过tinyconnection.setAutoCommit(),重写创建statement
			logger.logMessage(
					LogLevel.DEBUG,
					"autoCommit has change,original:{0}，now:{1},create new statement",
					autoCommit, tinyConnection.getAutoCommit());
			statement = shard.getConnection(tinyConnection).createStatement(
					resultSetType, resultSetConcurrency,
					getResultSetHoldability());
			setStatementProperties(statement);
			statementMap.get().put(shard, statement);
		} else {
			if (statement == null) {
				statement = shard.getConnection(tinyConnection)
						.createStatement(resultSetType, resultSetConcurrency,
								getResultSetHoldability());
				setStatementProperties(statement);
				statementMap.get().put(shard, statement);
			}
		}

		return statement;
	}

	protected Statement getNewStatement(String sql, Shard shard)
			throws SQLException {
		Statement statement = shard.getConnection(tinyConnection)
				.createStatement(resultSetType, resultSetConcurrency,
						getResultSetHoldability());
		setStatementProperties(statement);

		return statement;
	}

	protected void setStatementProperties(Statement statement)
			throws SQLException {
		statement.setMaxRows(maxRows);
		statement.setEscapeProcessing(escapeProcessing);
		statement.setQueryTimeout(queryTimeout);
		statement.setFetchSize(fetchSize);
	}

	public void close() throws SQLException {
		StringBuffer buffer = new StringBuffer();
		boolean noError = true;
		for (Statement statement : statementMap.get().values()) {
			try {
				statement.close();
			} catch (SQLException e) {
				buffer.append(String
						.format("statement close error,errorcode:%s,sqlstate:%s,message:%s \n",
								e.getErrorCode(), e.getSQLState(),
								e.getMessage()));
				noError = false;
				logger.errorMessage("statement close error", e);
			}

		}
		statementMap.get().clear();
		if (pool != null) {
			pool.shutdown();
		}
		this.isClosed = true;
		if (!noError) {
			throw new SQLException(buffer.toString());
		}
	}

	/**
	 * INTERNAL. Close and old result set if there is still one open.
	 */
	protected void closeOldResultSet() throws SQLException {
		try {
			if (!closedByResultSet) {
				if (resultSet != null) {
					resultSet.close();
				}
			}
		} finally {
			cancelled = false;
			resultSet = null;
			updateCount = 0;
		}
	}

	/**
	 * Check whether the statement was cancelled.
	 * 
	 * @return true if yes
	 */
	public boolean wasCancelled() {
		return cancelled;
	}

	protected void checkClosed() throws SQLException {
		tinyConnection.checkClosed();
		if (isClosed) {
			throw new SQLException("statement is closed");
		}
	}

	public int getMaxFieldSize() throws SQLException {
		checkClosed();
		return 0;
	}

	public void setMaxFieldSize(int max) throws SQLException {
		checkClosed();
	}

	public int getMaxRows() throws SQLException {
		return maxRows;
	}

	public void setMaxRows(int max) throws SQLException {
		checkClosed();
		if (maxRows < 0) {
			throw new SQLException("not valid value for maxRows:" + maxRows);
		}
		this.maxRows = max;
	}

	public void setEscapeProcessing(boolean enable) throws SQLException {
		checkClosed();
		this.escapeProcessing = enable;
	}

	public int getQueryTimeout() throws SQLException {
		checkClosed();
		return queryTimeout;
	}

	public void setQueryTimeout(int seconds) throws SQLException {
		checkClosed();
		this.queryTimeout = seconds;
	}

	public void cancel() throws SQLException {
		checkClosed();
		for (Statement statement : statementMap.get().values()) {
			statement.cancel();
		}
		cancelled = true;

	}

	public SQLWarning getWarnings() throws SQLException {
		checkClosed();
		return null;
	}

	public void clearWarnings() throws SQLException {
		checkClosed();
	}

	/**
	 * Sets the name of the cursor. This call is ignored.
	 */
	public void setCursorName(String name) throws SQLException {
		checkClosed();
	}

	public boolean execute(String sql) throws SQLException {
		org.tinygroup.jsqlparser.statement.Statement statement = routerManager
				.getSqlStatement(sql);
		boolean returnsResultSet = false;
		if (statement instanceof Select) {
			returnsResultSet = true;
			executeQuery(sql);
		} else {
			returnsResultSet = false;
			executeUpdate(sql);
		}
		return returnsResultSet;
	}

	private List<Shard> getPrimarySlaveShard(StatementExecuteContext context)
			throws SQLException {
		Partition partition = context.getPartition();
		if (context.isRead()) {
			return getReadShards(partition);
		}
		return getWriteShards(partition);
	}

	private List<Shard> getReadShards(Partition partition) throws SQLException {
		List<Shard> shards = new ArrayList<Shard>();
		Shard shard = null;
		if (isTransaction()) {// 从写的列表中随机选择一个进行读取
			shard = getShardWithTransaction(partition);
		} else {
			shard = getShardWithNoTransaction(partition);
		}
		shards.add(shard);
		return shards;
	}

	private Shard getShardWithTransaction(Partition partition) {
		return routerManager.getShardBalance().getReadShardWithTransaction(
				partition);
	}

	private Shard getShardWithNoTransaction(Partition partition) {
		return routerManager.getShardBalance().getReadableShard(partition);
	}

	private List<Shard> getWriteShards(Partition partition) {
		List<Shard> shards = new ArrayList<Shard>();
		shards.addAll(routerManager.getShardBalance().getWritableShard(
				partition));
		return shards;
	}

	/**
	 * 存在事务的情况
	 * 
	 * @return
	 * @throws SQLException
	 */
	private boolean isTransaction() throws SQLException {
		return !tinyConnection.getAutoCommit();
	}

	public ResultSet getResultSet() throws SQLException {
		checkClosed();
		return resultSet;
	}

	public int getUpdateCount() throws SQLException {
		checkClosed();
		return updateCount;
	}

	public boolean getMoreResults() throws SQLException {
		checkClosed();
		closeOldResultSet();
		return false;
	}

	public void setFetchDirection(int direction) throws SQLException {
		checkClosed();
	}

	public int getFetchDirection() throws SQLException {
		checkClosed();
		return ResultSet.FETCH_FORWARD;
	}

	public void setFetchSize(int rows) throws SQLException {
		checkClosed();
		if (rows < 0 || (rows > 0 && maxRows > 0 && rows > maxRows)) {
			throw new SQLException("invalid value for rows:" + rows);
		}
		if (rows == 0) {
			rows = 100;
		}
		this.fetchSize = rows;

	}

	public int getFetchSize() throws SQLException {
		checkClosed();
		return fetchSize;
	}

	public int getResultSetConcurrency() throws SQLException {
		checkClosed();
		return resultSetConcurrency;
	}

	public int getResultSetType() throws SQLException {
		checkClosed();
		return resultSetType;
	}

	public void addBatch(String sql) throws SQLException {
		checkClosed();
		StatementExecuteContext context = initStatementContext(sql);
		List<RealStatementExecutor> statements = getStatementsByContext(context);
		for (RealStatementExecutor statement : statements) {
			statement.addBatch();
		}
	}

	/**
	 * 根据sql获取需要执行的statements列表
	 * 
	 * @param sql
	 * @return
	 * @throws SQLException
	 */
	protected List<RealStatementExecutor> getStatementsByContext(
			StatementExecuteContext context) throws SQLException {
		Partition partition = context.getPartition();
		if (partition.getMode() == Partition.MODE_PRIMARY_SLAVE) {
			return statementsWithMasterSlaveMode(context);
		} else {
			return statementsByShardMode(context);
		}
	}

	private List<RealStatementExecutor> statementsByShardMode(
			StatementExecuteContext context) throws SQLException {
		Partition partition = context.getPartition();
		String sql = context.getOrignalSql();
		List<RealStatementExecutor> statements = new ArrayList<RealStatementExecutor>();
		String transSql = insertSpecialHandle(partition, sql);// insert语句需要特殊处理
		transSql = transformSqlWithStamentProcessor(transSql, context);
		Collection<Shard> shards = routerManager.getShards(partition, transSql,
				getPreparedParams());
		if (shards.size() == 0) {// 如果一个都没有符合的，就匹配所有的shard。
			shards = partition.getShards();
		}
		Iterator<Shard> iterator = shards.iterator();
		while (iterator.hasNext()) {
			Shard shard = iterator.next();
			String realSql = routerManager.getSql(partition, shard, transSql,
					getPreparedParams());// 变化表名等
			Statement realStatement = getStatement(shard, realSql);
			statements.add(new RealStatementExecutor(realStatement, realSql,
					sql, shard, partition, router, getPreparedParams()));
		}
		return statements;
	}

	private String transformSqlWithStamentProcessor(String transSql,
			StatementExecuteContext context) {
		for (StatementProcessor processor : routerManager
				.getStatementProcessorList()) {
			if (processor.isMatch(transSql, getPreparedParams())) {
				context.setStatementProcessor(processor);
				return processor.getSql(transSql, context);
			}
		}
		return transSql;
	}

	@SuppressWarnings("rawtypes")
	private String insertSpecialHandle(Partition partition, String sql)
			throws SQLException {
		Shard firstShard = partition.getShards().get(0);// 获取第一个分片
		RouterKeyGenerator generator = router.getKeyGenerator();
		String transSql = sql;
		if (generator != null) {
			org.tinygroup.jsqlparser.statement.Statement statement = RouterManagerBeanFactory
					.getManager().getSqlStatement(sql);
			if (statement instanceof Insert) {
				transSql = handleInsertStatement(firstShard, transSql,
						(Insert) statement);
			}
		}
		return transSql;
	}

	private List<RealStatementExecutor> statementsWithMasterSlaveMode(
			StatementExecuteContext context) throws SQLException {
		Partition partition = context.getPartition();
		String sql = context.getOrignalSql();
		List<RealStatementExecutor> statements = new ArrayList<RealStatementExecutor>();
		List<Shard> shards = getPrimarySlaveShard(context);
		for (Shard shard : shards) {
			String realSql = routerManager.getSql(partition, shard, sql,
					getPreparedParams());
			Statement realStatement = getStatement(shard, realSql);
			statements.add(new RealStatementExecutor(realStatement, realSql,
					sql, shard, partition, router, getPreparedParams()));
		}
		return statements;
	}

	private String handleInsertStatement(Shard firstShard, String sql,
			Insert insert) throws SQLException {
		String transSql = sql;
		String tableName = insert.getTable().getName();
		Cache cache = RouterManagerBeanFactory.getManager().getCache();
		CacheKey cacheKey = new CacheKey();
		cacheKey.update(tableName);
		ColumnInfo primaryColumn = null;
		String keyString = cacheKey.toString();
		try {
			primaryColumn = (ColumnInfo) cache.get(keyString);
		} catch (Exception e) {// 如果获取缓存的时候出现异常，认为此次操作不存在缓存内容
		}
		if (primaryColumn == null) {
			InsertSqlTransform transform = new InsertSqlTransform(insert,
					firstShard, tinyConnection.getMetaData());
			primaryColumn = transform.getPrimaryColumn();
			cache.put(keyString, primaryColumn);
		}
		boolean existPrimary = primaryKeyInColumns(
				primaryColumn.getColumnName(), insert);
		if (!existPrimary) {
			Insert newInsert = createNewInsert(insert, tableName, primaryColumn);
			transSql = newInsert.toString();
		}
		return transSql;
	}

	/**
	 * 创建拥有主键值的insert
	 * 
	 * @param insert
	 * @param tableName
	 * @param primaryColumn
	 * @return
	 * @throws SQLException
	 */
	private Insert createNewInsert(Insert insert, String tableName,
			ColumnInfo primaryColumn) throws SQLException {
		Insert newInsert = null;
		try {
			newInsert = (Insert) DbRouterUtil.deepCopy(insert);
		} catch (Exception e) {
			throw new DbrouterRuntimeException(e);
		}
		newInsert.getColumns().add(primaryColumn.getColumn());
		ItemsList itemsList = newInsert.getItemsList();
		if (itemsList instanceof ExpressionList) {
			List<Expression> expressions = ((ExpressionList) itemsList)
					.getExpressions();
			Expression expression = createExpression(expressions.size() + 1,
					primaryColumn.getDataType(), tableName);
			expressions.add(expression);
		}
		return newInsert;
	}

	protected Expression createExpression(int paramIndex, int dataType,
			String tableName) throws SQLException {
		Expression expression = null;
		switch (dataType) {
		case Types.CHAR:
		case Types.VARCHAR:
		case Types.LONGVARCHAR:
			String value = RouterManagerBeanFactory.getManager().getPrimaryKey(
					router, tableName);
			StringValue stringValue = new StringValue(value);
			expression = stringValue;
			break;
		case Types.NUMERIC:
		case Types.DECIMAL:
		case Types.INTEGER:
		case Types.SMALLINT:
		case Types.TINYINT:
		case Types.BIGINT:
			Object values = RouterManagerBeanFactory.getManager()
					.getPrimaryKey(router, tableName);
			expression = new LongValue(values + "");
			break;
		default:
		}
		return expression;
	}

	private boolean primaryKeyInColumns(String primaryKey, Insert insert) {
		List<Column> columns = insert.getColumns();
		boolean exist = false;
		if (!CollectionUtil.isEmpty(columns)) {
			for (Column column : columns) {
				if (column.getColumnName().equalsIgnoreCase(primaryKey)) {
					exist = true;
					break;
				}
			}
		}
		return exist;
	}

	protected Object[] getPreparedParams() {
		return new Object[0];
	}

	public void clearBatch() throws SQLException {
		checkClosed();
		for (Statement statement : statementMap.get().values()) {
			statement.clearBatch();
		}
	}

	public int[] executeBatch() throws SQLException {
		checkClosed();
		List<Integer> affectList = new ArrayList<Integer>();
		for (Statement statement : statementMap.get().values()) {
			int[] affects = statement.executeBatch();
			for (int affect : affects) {
				affectList.add(affect);
			}
		}
		int[] results = new int[affectList.size()];
		for (int i = 0; i < results.length; i++) {
			results[i] = affectList.get(i);
		}
		return results;
	}

	public Connection getConnection() throws SQLException {
		return tinyConnection;
	}

	public boolean getMoreResults(int current) throws SQLException {
		switch (current) {
		case Statement.CLOSE_CURRENT_RESULT:
		case Statement.CLOSE_ALL_RESULTS:
			checkClosed();
			closeOldResultSet();
			break;
		case Statement.KEEP_CURRENT_RESULT:
			break;
		default:
			throw new SQLException("invalid value for current:" + current);
		}
		return false;
	}

	public ResultSet getGeneratedKeys() throws SQLException {
		throw new SQLException("not support generatedKeys");
	}

	public int executeUpdate(String sql, int autoGeneratedKeys)
			throws SQLException {
		return executeUpdate(sql);
	}

	public int executeUpdate(String sql, int[] columnIndexes)
			throws SQLException {
		return executeUpdate(sql);
	}

	public int executeUpdate(String sql, String[] columnNames)
			throws SQLException {
		return executeUpdate(sql);
	}

	public boolean execute(String sql, int autoGeneratedKeys)
			throws SQLException {
		return execute(sql);
	}

	public boolean execute(String sql, int[] columnIndexes) throws SQLException {
		return execute(sql);
	}

	public boolean execute(String sql, String[] columnNames)
			throws SQLException {
		return execute(sql);
	}

	public int getResultSetHoldability() throws SQLException {
		checkClosed();
		return ResultSet.HOLD_CURSORS_OVER_COMMIT;
	}
}
